22. 实践课-视觉检测节点开发
视觉检测节点开发
关联:索引
- 场景提问:分拣产线相机拍到苹果,系统要把“苹果在哪、是什么等级、有无缺陷”变成可发布的数据,你觉得中间要经历哪些步骤?
1)目标检测输出到底是什么?
目标检测(Detection)的最小输出集合通常包含:
- 边界框(bbox):物体在图像上的位置(例如
x1,y1,x2,y2)。 - 置信度(confidence/score):模型对“这就是目标”的把握程度。
- 类别(class):例如
apple、defect。
工程上要牢记:检测模型输出的是“概率意义上的候选”,不是“百分百真实”。所以后处理要做阈值、去重、规则解释与验证。
2)YOLOv8 推理链路(从输入到框)
用一句话描述 YOLOv8(面向工程实现):
- 输入一张图像 → Backbone 提取特征 → Neck 融合多尺度特征 → Head 输出每个候选的类别与框 → NMS 去掉重复框 → 得到最终检测结果。
关键点(与节点开发相关):
-
多尺度:小目标/大目标检测依赖不同尺度特征,和“相机分辨率、目标距离”强相关。
-
Anchor-free:YOLOv8 采用更简化的框回归方式,工程上更关注输出的 bbox 坐标与置信度阈值设置。
-
NMS:同一苹果可能被多个框覆盖,需要去重;阈值不同会影响“漏检/误检”。
-
conf_thres:置信度阈值。越高越严格(误检少、漏检多);越低越宽松(误检多、漏检少)。 -
iou_thres:NMS 的 IoU 阈值。越低越容易“去重”(可能删掉相邻的两个真实目标);越高越保留更多框(可能重复)。 -
imgsz:推理输入尺寸。越大越准但越慢;CPU 环境往往要降到 640 或更低。
- 数据输入层:订阅
sensor_msgs/msg/Image(相机话题)。 - 推理层:调用 YOLOv8 推理,得到候选框。
- 后处理层:把检测框变成“业务可解释的指标”(果径、着色、缺陷)与最终等级。
- 输出层:封装成 ROS2 消息并发布(至少发布 2 路:原始检测结果 + 品质结果)。
本讲选择的“通用检测消息类型”是 vision_msgs/msg/Detection2DArray(更通用、便于其它节点复用)。其结构参考官方定义:header + detections[],每个 detection 里包含 bbox + results[](类别与置信度)等字段(见 ROS2 Humble 文档:Detection2D 与 Detection2DArray)。
-
输入:
sensor_msgs/msg/Image(相机) -
输出 1:
vision_msgs/msg/Detection2DArray(通用检测结果) -
输出 2:
apple_quality_msgs/msg/AppleQualityArray(业务品质结果) -
中间:推理层、后处理层分别做什么(用 2 句话写清)
-
组合 A:
conf_thres=0.8 -
组合 B:
conf_thres=0.3 -
组合 C:
iou_thres很低 / 很高(各自会发生什么) -
能把“现象”说成工程语言:漏检/误检/重复框、帧率下降、延迟上升。
-
能把“原因”关联到参数:阈值与 NMS 对结果的影响关系。
- 快问快答:视觉检测节点最少要订阅什么、发布什么?为什么我们要同时发布“原始检测结果”和“业务品质结果”?
1)安装 ROS 侧依赖(Ubuntu 22.04 + Humble)
sudo apt update
sudo apt install -y \
ros-humble-cv-bridge \
ros-humble-image-transport \
ros-humble-vision-msgs
解释与自检:
cv-bridge:把 ROS Image 转成 OpenCV 的numpy图像,或反向转换。image-transport:更推荐的图像话题传输方式(本讲可只用Image,但后续扩展常用)。vision-msgs:提供Detection2DArray等通用视觉消息,便于系统集成。- 自检 1:
ros2 pkg list | grep vision_msgs能看到vision_msgs。 - 自检 2:
ros2 interface show vision_msgs/msg/Detection2DArray能看到字段定义。
2)安装 YOLOv8(Ultralytics)Python 依赖(推荐方案:venv 固化环境)
本讲推荐用 venv 的原因:避免系统 Python 依赖冲突,且可把“能跑的环境”作为交付的一部分。
python3 -m venv --system-site-packages ~/venvs/yolo_ros2
source ~/venvs/yolo_ros2/bin/activate
python -m pip install -U pip
python -m pip install ultralytics opencv-python
解释与自检:
python3 -m venv --system-site-packages ...:创建虚拟环境目录,并复用系统 Python 的 site-packages(关键:让 venv 能 import 到 ROS2 的rclpy/cv_bridge等系统安装包)。source .../activate:激活虚拟环境,让python/pip指向 venv。ultralytics:YOLOv8 的 Python API(推理调用用它)。opencv-python:图像处理(着色比例、ROI 统计等会用到)。- 自检 1:
python -c "import ultralytics; import cv2; print('ultralytics=', ultralytics.__version__); print('opencv=', cv2.__version__)"
- 自检 2(建议做,确认 venv 同时具备 ROS2 与 YOLO 依赖):
source /opt/ros/humble/setup.bash
python -c "import rclpy; import cv_bridge; print('rclpy and cv_bridge ok')"
- 依赖冲突:cv_bridge需要numpy版本<2;opencv-python如果安装了较新版本,需要numpy>2。建议安装的版本numpy1.26.4, opencv-python版本为4.10.0.84。pytorch版本为2.11.0+cpu。
- 自检 3(可选,确认 YOLO 能加载模型;首次可能自动下载权重):
python - <<'PY'
from ultralytics import YOLO
m = YOLO('yolov8n.pt')
print('model loaded')
PY
- 如果你在 venv 中
colcon build,那么ros2 run生成的可执行脚本会绑定当前 Python 解释器路径;因此本讲建议:激活 venv → 再 colcon build → 再运行节点(避免“运行时找不到 ultralytics”)。
本讲的“品质结果”不建议强行塞进 Detection2DArray,原因是:品质字段(果径/着色/等级)属于业务扩展,应该和通用检测结果解耦,便于其它项目复用通用消息。
1)创建接口包(示例:apple_quality_msgs)
cd ~/ros2_ws/src
ros2 pkg create apple_quality_msgs --build-type ament_cmake
解释与自检:
- 接口包用于放
.msg/.srv/.action,符合你们在“自定义接口设计规范”课上学到的流程。 - 自检:创建后应有
package.xml、CMakeLists.txt等基础文件。
2)定义消息文件(两种:单个苹果 + 数组)
apple_quality_msgs/msg/AppleQuality.msg
std_msgs/Header header
string id
float32 center_x
float32 center_y
float32 size_x
float32 size_y
float32 diameter_mm
float32 red_ratio
float32 defect_score
string grade
header:时间戳与坐标系(对齐源图像)。center_x/center_y/size_x/size_y:像素坐标与尺寸(用于可视化与联动)。diameter_mm:估计果径(毫米;需要标定或 mm_per_px 参数)。red_ratio:着色比例(0~1)。defect_score:缺陷评分(0~1,越高越可能有缺陷)。grade:等级(例如A/B/C,规则由后处理层给出)。
apple_quality_msgs/msg/AppleQualityArray.msg
std_msgs/Header header
AppleQuality[] apples
解释:
header:对齐图像与检测时间戳。apples[]:一帧图像里可能有多个苹果。
3)在 CMakeLists.txt 与 package.xml 中启用 rosidl(关键步骤)
apple_quality_msgs/package.xml(关键依赖片段)
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<depend>std_msgs</depend>
解释:
rosidl_default_generators:让.msg生成对应语言的代码。rosidl_default_runtime:运行时依赖,保证其它包能用到生成的消息。std_msgs:因为消息里引用了std_msgs/Header。
apple_quality_msgs/CMakeLists.txt(最小可用片段)
cmake_minimum_required(VERSION 3.8)
project(apple_quality_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/AppleQuality.msg"
"msg/AppleQualityArray.msg"
DEPENDENCIES std_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()
解释:
rosidl_generate_interfaces:声明要生成的消息文件列表。DEPENDENCIES std_msgs:告诉生成器这些消息依赖std_msgs的类型。
构建与自检:
cd ~/ros2_ws
colcon build --packages-select apple_quality_msgs
source install/setup.bash
ros2 interface show apple_quality_msgs/msg/AppleQuality
ros2 interface show ...输出的字段,必须和你写的.msg一致(字段名/类型对齐)。
本的检测节点输入是 sensor_msgs/msg/Image。但你们的“相机来源”可能有两种:
-
方案 B(仿真闭环):Gazebo Fortress(Ignition)相机 → 桥接到 ROS2 图像话题(适合后续驱动机械臂做闭环演示)
-
本次以“把检测节点与消息发布跑通”为主,所以统一采用方案 A。
-
方案 B 在你们后续做“检测结果驱动机械臂分拣”时更关键,因为它天然有仿真坐标系与
/clock,但需要额外的桥接与标定/坐标处理。
目标:把文件夹里的多张苹果真实照片,循环发布成 ROS2 Image 话题,例如 /real/camera/image_raw,让 YOLO 节点订阅它。
推荐话题命名(避免与仿真相机冲突):
- 真实图片发布:
/real/camera/image_raw - Gazebo 相机(若同时运行):
/sim/camera/image_raw
实现方式(最小可跑通):在你们的 Python 包里增加一个“图片播放器节点”,每隔 publish_hz 秒读取下一张图片并发布。
示例代码(可复制,发布文件夹图片到 ROS2):
import glob
from pathlib import Path
import cv2
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
class ImageFolderPublisher(Node):
def __init__(self) -> None:
super().__init__("image_folder_publisher")
self.declare_parameter("image_dir", "")
self.declare_parameter("image_topic", "/real/camera/image_raw")
self.declare_parameter("publish_hz", 5.0)
self.declare_parameter("loop", True)
image_dir = self.get_parameter("image_dir").get_parameter_value().string_value
image_topic = self.get_parameter("image_topic").get_parameter_value().string_value
publish_hz = self.get_parameter("publish_hz").get_parameter_value().double_value
loop = self.get_parameter("loop").get_parameter_value().bool_value
if not image_dir:
raise RuntimeError("parameter image_dir is empty")
patterns = ["*.jpg", "*.jpeg", "*.png", "*.bmp", "*.webp"]
files: list[str] = []
for p in patterns:
files.extend(glob.glob(str(Path(image_dir) / p)))
files = sorted(files)
if not files:
raise RuntimeError(f"no image files found in {image_dir}")
self.files = files
self.loop = bool(loop)
self.index = 0
self.bridge = CvBridge()
self.pub = self.create_publisher(Image, image_topic, qos_profile_sensor_data)
self.timer = self.create_timer(1.0 / float(publish_hz), self.on_timer)
self.get_logger().info(f"publishing {len(self.files)} images from {image_dir} to {image_topic} at {publish_hz}Hz")
def on_timer(self) -> None:
if self.index >= len(self.files):
if not self.loop:
self.get_logger().info("finished publishing (loop=false), shutting down")
rclpy.shutdown()
return
self.index = 0
path = self.files[self.index]
self.index += 1
bgr = cv2.imread(path, cv2.IMREAD_COLOR)
if bgr is None:
self.get_logger().warn(f"failed to read image: {path}")
return
msg = self.bridge.cv2_to_imgmsg(bgr, encoding="bgr8")
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "real_camera"
self.pub.publish(msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = ImageFolderPublisher()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
逐段解释与自检:
publish_hz:发布频率;CPU 推理慢时建议 1~5Hz,避免队列积压。qos_profile_sensor_data:让图像“尽量新鲜”,更符合相机流语义。msg.header.stamp:用节点时钟打时间戳;若你启用了use_sim_time并桥接了/clock,这里会自动跟随仿真时间。
- 将上述代码保存为:
~/ros2_ws/src/apple_vision_yolov8/apple_vision_yolov8/image_folder_publisher_node.py - 在
apple_vision_yolov8/setup.py的entry_points中增加一个可执行入口:
entry_points={
"console_scripts": [
"image_folder_publisher = apple_vision_yolov8.image_folder_publisher_node:main",
],
},
解释:
ros2 run apple_vision_yolov8 image_folder_publisher运行的就是这里注册的名字。apple_vision_yolov8.image_folder_publisher_node:main必须可导入,否则会出现入口点找不到。
source /opt/ros/humble/setup.bash
source ~/venvs/yolo_ros2/bin/activate
cd ~/ros2_ws
colcon build --symlink-install --packages-select apple_vision_yolov8
source install/setup.bash
ros2 run apple_vision_yolov8 image_folder_publisher --ros-args \
-p image_dir:=/path/to/apple_images_folder \
-p image_topic:=/real/camera/image_raw \
-p publish_hz:=5.0
ros2 topic info /real/camera/image_raw -v
ros2 topic echo /real/camera/image_raw --once
topic info -v能看到发布者,类型是sensor_msgs/msg/Image。echo --once能看到encoding: bgr8与header时间戳。
方案 B:Gazebo Fortress 相机作为输入(后续闭环用)
目标:让 Gazebo Fortress(Ignition)仿真相机发布图像,再通过桥接变成 ROS2 的 sensor_msgs/msg/Image,供 YOLO 节点订阅。
关键结论(你们需要提前知道):
- 这是“仿真闭环”的正确做法,但它得到的是仿真渲染画面,不是你电脑里文件夹的真实照片。
- 若你的 YOLO 模型只在真实照片上训练,直接用仿真画面推理可能识别效果变差(风格域差异)。工程上通常需要材质/光照调参或域随机化/再训练。
你需要的系统要素(不展开细节,见《16 Gazebo Fortress ... 适配基础》):
- Gazebo 发布仿真时间
/clock;ROS2 节点设置use_sim_time:=true - 使用
ros_ign_bridge或ros_gz_bridge桥接相机图像与/clock - 统一话题命名(建议
/sim/camera/image_raw)与 frame(建议camera_optical_frame)
本讲的最小闭环“先不做品质逻辑”,只做:
- 输入:订阅
/real/camera/image_raw(方案 A:文件夹真实图片发布得到) - 输出:发布
/sorting/perception/yolo_detections(vision_msgs/msg/Detection2DArray)
1)创建节点包(Python,示例:apple_vision_yolov8)
cd ~/ros2_ws/src
ros2 pkg create apple_vision_yolov8 --build-type ament_python --dependencies rclpy sensor_msgs vision_msgs cv_bridge
解释与自检:
ament_python:Python 节点包,便于快速迭代。sensor_msgs:订阅图像。vision_msgs:发布通用检测结果。cv_bridge:ROS Image ↔ OpenCV 图像转换。
2)核心节点代码(可复制版本)
建议文件路径:
~/ros2_ws/src/apple_vision_yolov8/apple_vision_yolov8/yolov8_detector_node.py
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray, Detection2D, BoundingBox2D, ObjectHypothesisWithPose
from cv_bridge import CvBridge
from ultralytics import YOLO
class Yolov8DetectorNode(Node):
def __init__(self) -> None:
super().__init__("yolov8_detector")
self.declare_parameter("image_topic", "/real/camera/image_raw")
self.declare_parameter("detections_topic", "/sorting/perception/yolo_detections")
self.declare_parameter("model_path", "yolov8n.pt")
self.declare_parameter("conf_thres", 0.5)
self.declare_parameter("device", "cpu")
image_topic = self.get_parameter("image_topic").get_parameter_value().string_value
detections_topic = self.get_parameter("detections_topic").get_parameter_value().string_value
model_path = self.get_parameter("model_path").get_parameter_value().string_value
conf_thres = self.get_parameter("conf_thres").get_parameter_value().double_value
device = self.get_parameter("device").get_parameter_value().string_value
self.bridge = CvBridge()
self.model = YOLO(model_path)
self.device = str(device)
self.conf_thres = float(conf_thres)
self.pub = self.create_publisher(Detection2DArray, detections_topic, 10)
self.sub = self.create_subscription(Image, image_topic, self.on_image, qos_profile_sensor_data)
self.get_logger().info(
f"yolov8_detector started. image_topic={image_topic} detections_topic={detections_topic} model={model_path} device={device}"
)
def on_image(self, msg: Image) -> None:
t0 = time.perf_counter()
frame_bgr = self.bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
result = self.model.predict(frame_bgr, verbose=False, device=self.device, conf=self.conf_thres)[0]
det_arr = Detection2DArray()
det_arr.header = msg.header
names = result.names
boxes = result.boxes
if boxes is not None:
for i in range(len(boxes)):
b = boxes[i]
score = float(b.conf.item())
cls_id = int(b.cls.item())
cls_name = str(names.get(cls_id, str(cls_id)))
x1, y1, x2, y2 = [float(v) for v in b.xyxy[0].tolist()]
cx = (x1 + x2) / 2.0
cy = (y1 + y2) / 2.0
sx = (x2 - x1)
sy = (y2 - y1)
det = Detection2D()
det.header = msg.header
det.id = str(i)
det.bbox = BoundingBox2D()
det.bbox.center.position.x = cx
det.bbox.center.position.y = cy
det.bbox.center.theta = 0.0
det.bbox.size_x = sx
det.bbox.size_y = sy
hyp = ObjectHypothesisWithPose()
hyp.hypothesis.class_id = cls_name
hyp.hypothesis.score = score
det.results.append(hyp)
det_arr.detections.append(det)
self.pub.publish(det_arr)
dt_ms = (time.perf_counter() - t0) * 1000.0
self.get_logger().info(f"published {len(det_arr.detections)} detections, latency={dt_ms:.1f}ms")
def main(args=None) -> None:
rclpy.init(args=args)
node = Yolov8DetectorNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
- 参数区:把话题名/模型路径/阈值/设备做成参数,是为了“同一套代码适配不同相机与不同模型”,便于团队协作。
qos_profile_sensor_data:适合高频传感器数据,强调“尽量新鲜”,减少排队延迟。CvBridge.imgmsg_to_cv2(..., bgr8):得到 OpenCV 的 BGR 图像;若你的相机是rgb8或mono8,这里需要调整(见常见错误)。result = self.model.predict(...)[0]:执行一次推理并拿到当前帧结果;device/conf在这里显式指定,避免环境差异导致“跑到了错误设备或阈值未生效”。- bbox 坐标:YOLO 输出
xyxy,这里转换成BoundingBox2D的center + size(像素单位,允许浮点)。 ObjectHypothesisWithPose:只填class_id与score已足够用于 2D 检测结果发布;pose 可留默认值(工程上可扩展)。- 日志
latency:作为性能证据,后续你们要用它判断“推理过慢导致卡顿”。
3)配置入口点(setup.py)
在 apple_vision_yolov8/setup.py 的 entry_points 添加:
entry_points={
"console_scripts": [
"yolov8_detector = apple_vision_yolov8.yolov8_detector_node:main",
],
},
解释:
ros2 run <pkg> <exec>运行的就是这里注册的console_scripts名称。pkg.module:function必须可导入、函数名要对齐,否则会出现“入口点找不到”的错误。
source /opt/ros/humble/setup.bash
source ~/venvs/yolo_ros2/bin/activate
cd ~/ros2_ws
colcon build --symlink-install --packages-select apple_vision_yolov8
source install/setup.bash
ros2 run apple_vision_yolov8 yolov8_detector --ros-args -p image_topic:=/real/camera/image_raw
解释:
- 先
source /opt/ros/...:保证 ROS2 环境变量正确。 - 再激活 venv:保证运行时能 import
ultralytics。 --ros-args -p ...:运行时覆盖参数,不用改源码。
ros2 topic list | grep yolo
ros2 topic info /sorting/perception/yolo_detections -v
ros2 topic echo /sorting/perception/yolo_detections --once
ros2 topic hz /sorting/perception/yolo_detections
你应该能说清的“证据解释”:
topic info -v:看发布者/订阅者与类型是否正确(类型必须是vision_msgs/msg/Detection2DArray)。topic echo --once:证明消息字段真的在发(至少看到detections:列表)。topic hz:给出发布频率;若很低,说明推理耗时或相机帧率低。
- 复盘:我们已经发布了“检测框”,但分拣系统真正需要的是“可用的业务指标”。你认为哪些指标最能决定苹果的分级?
1)果径(diameter)的估计:必须说明“单位来源”
- 以 bbox 的较大边作为直径像素
diameter_px = max(size_x, size_y)。 - 用参数
mm_per_px转换:diameter_mm = diameter_px * mm_per_px。
注意:
mm_per_px需要标定(例如用标定板/已知尺寸物体在固定距离下求比例)。本讲不要求做完整相机标定,但必须把“单位来源”写进报告。
2)着色(red_ratio):把“红”定义成可计算的阈值
- 取 ROI(bbox 区域)→ 转 HSV → 在红色范围内做 mask →
red_ratio = red_pixels / roi_pixels。 - 红色 HSV 常用两段阈值(因为色相环跨 0):
H ∈ [0,10]或H ∈ [170,180],并结合S/V下限过滤灰暗区域。
3)缺陷(defect_score):先给出“可落地”的最小规则
两种落地方式(本讲建议 A,B 作为扩展):
-
A(推荐):用 YOLO 模型直接检测缺陷类别(例如
defect),用最高置信度作为defect_score。 -
B(扩展):在 ROI 内做简单纹理/暗斑统计,作为缺陷概率(可解释,但容易受光照影响)。
-
输出 1:
/sorting/perception/yolo_detections(不变,用于通用检测) -
输出 2:
/sorting/perception/apple_quality(新,业务品质结果)
1)在节点包中增加对接口包的依赖
创建依赖前,先构建并 source 接口包(否则 Python 找不到消息类型):
cd ~/ros2_ws
colcon build --packages-select apple_quality_msgs
source install/setup.bash
然后在创建节点包时或事后,确保依赖中包含 apple_quality_msgs(package.xml 与 setup.py 的 install_requires/依赖声明要对齐,你们可按“工作空间规范课”的口径修改)。
2)品质后处理函数(可复制)
将下面函数放到 yolov8_detector_node.py 同文件中(或拆到 utils.py,推荐拆分以体现模块化思维)。
import cv2
import numpy as np
def compute_red_ratio(bgr_roi: np.ndarray) -> float:
if bgr_roi.size == 0:
return 0.0
hsv = cv2.cvtColor(bgr_roi, cv2.COLOR_BGR2HSV)
lower1 = np.array([0, 80, 50], dtype=np.uint8)
upper1 = np.array([10, 255, 255], dtype=np.uint8)
lower2 = np.array([170, 80, 50], dtype=np.uint8)
upper2 = np.array([180, 255, 255], dtype=np.uint8)
mask1 = cv2.inRange(hsv, lower1, upper1)
mask2 = cv2.inRange(hsv, lower2, upper2)
mask = cv2.bitwise_or(mask1, mask2)
red_pixels = int(np.count_nonzero(mask))
total_pixels = int(mask.size)
return float(red_pixels) / float(total_pixels)
def estimate_diameter_mm(size_x_px: float, size_y_px: float, mm_per_px: float) -> float:
diameter_px = max(float(size_x_px), float(size_y_px))
return diameter_px * float(mm_per_px)
def grade_apple(diameter_mm: float, red_ratio: float, defect_score: float) -> str:
if defect_score >= 0.6:
return "C"
if diameter_mm >= 80.0 and red_ratio >= 0.6:
return "A"
if diameter_mm >= 70.0 and red_ratio >= 0.4:
return "B"
return "C"
逐段解释与自检:
compute_red_ratio:把“红”固化成 HSV 阈值;S/V下限用于剔除灰暗/阴影区域,减少误判。
3)发布品质消息(可复制核心片段)
在节点中新增:
- 参数:
quality_topic、mm_per_px - 发布者:
AppleQualityArray - 在循环中对每个 apple detection 计算 ROI 指标并填充消息
示例片段(展示关键逻辑,要求你们能把它集成进自己的节点并跑通):
from apple_quality_msgs.msg import AppleQuality, AppleQualityArray
# ... 在 __init__ 里新增参数与 publisher(示例)
# self.declare_parameter("quality_topic", "/sorting/perception/apple_quality")
# self.declare_parameter("mm_per_px", 0.2)
# self.quality_pub = self.create_publisher(AppleQualityArray, quality_topic, 10)
# ... 在 on_image() 里:在发布 det_arr 后,生成 quality_arr(示例)
quality_arr = AppleQualityArray()
quality_arr.header = msg.header
defect_best = {}
for det in det_arr.detections:
if not det.results:
continue
cls = det.results[0].hypothesis.class_id
score = float(det.results[0].hypothesis.score)
if cls == "defect":
defect_best[det.id] = max(defect_best.get(det.id, 0.0), score)
for det in det_arr.detections:
if not det.results:
continue
cls = det.results[0].hypothesis.class_id
if cls != "apple":
continue
cx = float(det.bbox.center.position.x)
cy = float(det.bbox.center.position.y)
sx = float(det.bbox.size_x)
sy = float(det.bbox.size_y)
x1 = int(max(0.0, cx - sx / 2.0))
y1 = int(max(0.0, cy - sy / 2.0))
x2 = int(min(frame_bgr.shape[1] - 1, cx + sx / 2.0))
y2 = int(min(frame_bgr.shape[0] - 1, cy + sy / 2.0))
roi = frame_bgr[y1:y2, x1:x2]
red_ratio = compute_red_ratio(roi)
diameter_mm = estimate_diameter_mm(sx, sy, self.mm_per_px)
defect_score = float(defect_best.get(det.id, 0.0))
grade = grade_apple(diameter_mm, red_ratio, defect_score)
aq = AppleQuality()
aq.header = msg.header
aq.id = det.id
aq.center_x = cx
aq.center_y = cy
aq.size_x = sx
aq.size_y = sy
aq.diameter_mm = float(diameter_mm)
aq.red_ratio = float(red_ratio)
aq.defect_score = float(defect_score)
aq.grade = grade
quality_arr.apples.append(aq)
self.quality_pub.publish(quality_arr)
逐段解释(你们需要能讲清的要点):
- ROI 裁剪:用 bbox
center/size还原x1,y1,x2,y2,并做边界裁剪,避免越界崩溃。 - 消息封装:
AppleQuality是业务消息,字段要自洽、可解释;grade规则要能复核。
ros2 topic info /sorting/perception/apple_quality -v
ros2 topic echo /sorting/perception/apple_quality --once
你在说明里至少回答 3 点:
- 你如何从 bbox 得到果径(单位怎么来的)?
- 你如何定义“红”(阈值怎么选的)?
- 你如何得到缺陷评分(模型类别或规则来源)?
- 复盘:现在我们“能跑”了,但工程交付还缺什么?(提示:参数化、性能、鲁棒性、证据链)
你是 ROS2 Humble + Python 视觉节点开发专家。请基于以下约束,生成一个可运行的 YOLOv8 视觉检测节点(ament_python)。
输入:
- 订阅 sensor_msgs/msg/Image(参数 image_topic)
输出(必须同时发布 2 路话题):
- /sorting/perception/yolo_detections:vision_msgs/msg/Detection2DArray
- /sorting/perception/apple_quality:apple_quality_msgs/msg/AppleQualityArray(字段:center/size/diameter_mm/red_ratio/defect_score/grade)
工程约束:
1) ROS2:Humble;Python:rclpy;图像转换:cv_bridge;推理:ultralytics.YOLO
2) 参数化:model_path、conf_thres、device、mm_per_px、red_hsv_thresholds(建议可配置)
3) 性能:避免在回调中做长时间阻塞;给出日志打印每帧推理耗时
4) 验收:给出 colcon build、ros2 run、ros2 topic echo/hz 的命令与期望现象
5) 代码要包含必要注释,并说明每个参数的作用
解释:
2)审计清单(必须写进作业的“审计优化痕迹”)
- 依赖是否齐全:
package.xml/setup.py/import 是否一致? - 话题/类型是否对齐:
ros2 topic info -v输出类型是否正确? - 参数是否生效:
--ros-args -p是否能覆盖默认值? - 异常边界是否处理:空检测、越界 ROI、模型路径不存在、图像编码不匹配?
- 性能证据是否存在:日志是否打印 latency;是否避免无意义的频繁打印导致卡顿?
- 参数化阈值:把
grade_apple的阈值(80/70、0.6/0.4、defect 0.6)做成 ROS2 参数,运行时可调。 - 降低阻塞:将推理从订阅回调移到定时器(例如保存最新帧、按固定频率推理),避免相机高帧率时回调堆积。
- 日志降噪:每 N 帧打印一次 latency;或在 latency 超过阈值时才 warn。
回归测试口径(每次改完必须跑):
ros2 topic echo /sorting/perception/yolo_detections --once
ros2 topic echo /sorting/perception/apple_quality --once
ros2 topic hz /sorting/perception/yolo_detections
你需要能解释的“回归结论”:
-
改动没有破坏话题类型与字段(echo 仍能看到字段)。
-
发布频率与 latency 是否改善(hz 与日志对比)。
-
西北勒苹果在采后分级、包装、流通环节对“外观品质稳定性”要求高。传统人工分拣受疲劳、主观性影响大,品质一致性难保证。
-
智能视觉检测(目标检测 + 缺陷识别 + 着色/果径量化)能把“经验判断”转成“可追溯的数据”,支撑标准化分级,提高优果率与附加值。
-
技术落地要服务地方产业:不仅要“能跑模型”,更要能把结果做成稳定的 ROS2 数据接口,方便与输送带、机械臂、调度系统协同,形成可复制的产线方案。
收束到工程职业素养(本讲统一口径):
- 对 AI 生成代码要负责:能跑不等于正确;必须审计、测试、保留证据。
- 对产业场景要负责:指标定义要可解释、可复核、可追溯;避免“黑盒输出无法落地”。
ModuleNotFoundError: ultralytics
- 原因:运行节点时未激活 venv,或 build 时与运行时 Python 环境不一致。
- 修复:
source ~/venvs/yolo_ros2/bin/activate后重新colcon build,再ros2 run。
CvBridgeError: ... encoding
- 原因:相机话题编码不是
bgr8。 - 修复:用
ros2 topic echo /camera/image_raw --once看encoding,并把desired_encoding改为匹配值(如rgb8),或统一转换到 BGR。
- 话题存在但
echo没有数据
- 排查:先
ros2 node list/ros2 topic list,再ros2 topic info -v看类型是否对齐;最后用rqt_graph看连接关系。 - 常见原因:话题名不一致、节点没 source 工作空间、入口点未配置。
- 推理太慢导致 hz 很低
- 处理:降低
imgsz、提高conf_thres、减少发布日志频率、改为定时器推理;有 GPU 则设置device:=cuda:0(前提是环境已配置)。
参考与延伸(建议课后自学)
- ROS2 Humble vision_msgs Detection2D:https://docs.ros.org/en/humble/p/vision_msgs/msg/Detection2D.html
- ROS2 Humble vision_msgs Detection2DArray:https://docs.ros.org/en/humble/p/vision_msgs/msg/Detection2DArray.html
- ROS2 Humble vision_msgs BoundingBox2D:https://docs.ros.org/en/humble/p/vision_msgs/msg/BoundingBox2D.html
- Ultralytics YOLO(官方文档):https://docs.ultralytics.com/